iT邦幫忙

2023 iThome 鐵人賽

DAY 10
1

今天這段其實不是開發 Airflow 必要的部份,甚至會多增加一點麻煩。但我個人還是比較喜歡這種寫法,所以來寫一些。

前面有提到 PythonOperator 是 Airflow 內最泛用又彈性的 operator,但是它寫起來大概會像這樣:

def process_data():
		#...do something
		result = get_result()
    push_result_to_xcom(result)
	  

def load_data():
    data = get_data_from_xcom()
    #...do something
    pass

task1 = PythonOperator(
    task_id='process_data_task',
    python_callable=process_data,
    provide_context=True,
    dag=dag,
)

task2 = PythonOperator(
    task_id='process_data_task',
    python_callable=load_data,
    provide_context=True,
    dag=dag,
)

task1 >> task2

這種寫法某種程度上,不是很 python,如果不熟悉 Airflow 框架的人可能一時間看不懂。而且資料的關係被隱藏在 XCom 裡 (假設兩個 task 需要傳遞),看起來也有一點累贅。所以 Airflow 後來出了新的 feature 叫 TaskFlow,可以解決一部份問題。

@task
def process_data():
		#...do something
    result = get_result()
		return result

@task
def load_data(data):
    #...do something
    pass

data = process_data()
load_data(data)

第一段的程式作用可以完全被新的第二段給取代掉,看起來是不是乾淨,易讀許多?你要做的就是加上 @task 這個裝飾器,剩下的 Airflow 會處理掉,包含 xcom 的資料存取。

限制

一、依賴關係的表達可能變複雜

語法糖當然還是有代價的,首先它只能取代掉 PythonOperator,其他的 operator 還是要建立 instance。而且 @task 跟傳統的寫法雖然可以共存,但會增加指定關係的複雜度。我們上面的例子如果在中間要多經過一個 PostgresOperator 的話,寫起來會像這樣:

@task
def process_data():
		#...do something
    result = get_result()
		return result

@task
def load_data(data):
		result = get_from_xcom()
    #...do something
    pass

create_table_task = PostgresOperator(
    task_id='create_table',
    sql='CREATE TABLE IF NOT EXISTS my_table (id serial PRIMARY KEY, data text);',
    postgres_conn_id='my_postgres_conn',  # 使用之前配置的連接
    dag=dag,
)

data = process_data()
data >> create_table_task >> load_data()

經過傳統的 operator 時,還是要使用 >> 符號來表示關係,下游則是依然要從 XCom 取值。如果對程式 style 一致性很注重的人,可能要注意一下。

二、測試時需要多一層 function

這項見仁見智,但我必須先提醒一下。

加上 @task 裝飾之後,在測試裡要呼叫 function 時,不能直接丟參數,或是存取 return value

# This test function will fail
def test_process_data():
    result = process_data()
    assert result == 1

這個測項是會錯誤的,不是因為 result 不是 1 ,而是因為 result 其實已經被包裝進 XCom 物件裡。也是因為這樣,你要傳參數進去的時候也必須包裝進 XCom 物件裡。

解決方法第一個當然就照他的格式,但我個人會用第二種:

# This test function will pass 
def test_process_data():
    result = process_data.function()
    assert result == 1

透過 .function() ,你就可以照一般的 python function 使用,無論是 input 參數或是 output 的 return value,都可以直接正常使用,如此一來在寫測試的時候會輕鬆許多。

優點

除了一開始提到的部份,還有一些優點。有一些參數在傳統寫法內,你要從 context 內取出 (參考 Templates reference — Airflow Documentation (apache.org)),如此一來在程式內你就不得不將 context 做為一個參數,進而增加測試要 mock 掉的物件。

但改用 TaskFlow 的話,你可以寫成這樣:

# This test function will pass 
def process_data(data_interval_start: None, data_interval_end:None):
    get_data_in_time_range(data_interval_start, data_interval_end)

原本 data_interval_start 是要從 context 取得的,或是要辛苦一點在 op_karge 內用 template 定義並帶入 {{data_interval_start}} 。但用 TaskFlow 就可以再簡化這段,而且不用知道 context 這個上下文內容。

而且理所當然的,都已經盡量跟 airflow 的框架解耦合了,你的單元測試會更好寫

小結

  • TaskFlow 是 Airflow 的一個新功能,可以用裝飾器 @task 來簡化 PythonOperator 的寫法,並且自動處理 XCom 的資料傳遞。
  • TaskFlow 的優點是可以讓程式碼更乾淨,易讀,並且可以直接使用 context 內的參數,而不需要提供 context 參數或是用 template 定義。
  • TaskFlow 的限制是不能取代其他的 operator,而且在測試時需要多一層 .function 來呼叫真實function。

上一篇
Airflow 自訂 Operator - Day9
下一篇
Airflow 的單元測試 (一) - Day11
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言